// Copyright (c) 2023-2025 ParadeDB, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use crate::api::operator::{anyelement_query_input_opoid, anyelement_text_opoid};
use crate::api::window_aggregate::window_agg_oid;
use crate::gucs;
use crate::nodecast;
use crate::postgres::customscan::aggregatescan::targetlist::TargetList;
use crate::postgres::customscan::builders::custom_path::{
    CustomPathBuilder, Flags, RestrictInfoType,
};
use crate::postgres::customscan::pdbscan::projections::window_agg;
use crate::postgres::customscan::qual_inspect::{extract_quals, PlannerContext, QualExtractState};
use crate::postgres::customscan::{CreateUpperPathsHookArgs, CustomScan, RelPathlistHookArgs};
use crate::postgres::rel_get_bm25_index;
use crate::postgres::utils::expr_contains_any_operator;
use once_cell::sync::Lazy;
use pgrx::{pg_guard, pg_sys, PgList, PgMemoryContexts};
use std::collections::{hash_map::Entry, HashMap};

unsafe fn add_path(rel: *mut pg_sys::RelOptInfo, mut path: pg_sys::CustomPath) {
    let forced = path.flags & Flags::Force as u32 != 0;
    path.flags ^= Flags::Force as u32; // make sure to clear this flag because it's special to us

    let mut custom_path = PgMemoryContexts::CurrentMemoryContext
        .copy_ptr_into(&mut path, std::mem::size_of_val(&path));

    if (*custom_path).path.parallel_aware {
        // add the partial path since the user-generated plan is parallel aware
        pg_sys::add_partial_path(rel, custom_path.cast());

        // remove all the existing possible paths
        (*rel).pathlist = std::ptr::null_mut();

        // then make another copy of it, increase its costs really, really high and
        // submit it as a regular path too, immediately after clearing out all the other
        // existing possible paths.
        //
        // We don't want postgres to choose this path, but we have to have at least one
        // non-partial path available for it to consider
        let copy = PgMemoryContexts::CurrentMemoryContext
            .copy_ptr_into(&mut path, std::mem::size_of_val(&path));
        (*copy).path.parallel_aware = false;
        (*copy).path.total_cost = 1000000000.0;
        (*copy).path.startup_cost = 1000000000.0;

        // will be added down below
        custom_path = copy.cast();
    } else if forced {
        // remove all the existing possible paths
        (*rel).pathlist = std::ptr::null_mut();
    }

    // add this path for consideration
    pg_sys::add_path(rel, custom_path.cast());
}

pub fn register_rel_pathlist<CS>(_: CS)
where
    CS: CustomScan<Args = RelPathlistHookArgs> + 'static,
{
    unsafe {
        static mut PREV_HOOKS: Lazy<HashMap<std::any::TypeId, pg_sys::set_rel_pathlist_hook_type>> =
            Lazy::new(Default::default);

        #[pg_guard]
        extern "C-unwind" fn __priv_callback<CS>(
            root: *mut pg_sys::PlannerInfo,
            rel: *mut pg_sys::RelOptInfo,
            rti: pg_sys::Index,
            rte: *mut pg_sys::RangeTblEntry,
        ) where
            CS: CustomScan<Args = RelPathlistHookArgs> + 'static,
        {
            unsafe {
                #[allow(static_mut_refs)]
                if let Some(Some(prev_hook)) = PREV_HOOKS.get(&std::any::TypeId::of::<CS>()) {
                    (*prev_hook)(root, rel, rti, rte);
                }

                paradedb_rel_pathlist_callback::<CS>(root, rel, rti, rte);
            }
        }

        #[allow(static_mut_refs)]
        match PREV_HOOKS.entry(std::any::TypeId::of::<CS>()) {
            Entry::Occupied(_) => panic!("{} is already registered", std::any::type_name::<CS>()),
            Entry::Vacant(entry) => entry.insert(pg_sys::set_rel_pathlist_hook),
        };

        pg_sys::set_rel_pathlist_hook = Some(__priv_callback::<CS>);

        pg_sys::RegisterCustomScanMethods(CS::custom_scan_methods())
    }
}

/// Although this hook function can be used to examine, modify, or remove paths generated by the
/// core system, a custom scan provider will typically confine itself to generating CustomPath
/// objects and adding them to rel using add_path. The custom scan provider is responsible for
/// initializing the CustomPath object, which is declared like this:
#[pg_guard]
pub extern "C-unwind" fn paradedb_rel_pathlist_callback<CS>(
    root: *mut pg_sys::PlannerInfo,
    rel: *mut pg_sys::RelOptInfo,
    rti: pg_sys::Index,
    rte: *mut pg_sys::RangeTblEntry,
) where
    CS: CustomScan<Args = RelPathlistHookArgs> + 'static,
{
    unsafe {
        if !gucs::enable_custom_scan() {
            return;
        }

        let Some(path) = CS::create_custom_path(CustomPathBuilder::new(
            root,
            rel,
            RelPathlistHookArgs {
                root,
                rel,
                rti,
                rte,
            },
        )) else {
            return;
        };

        add_path(rel, path)
    }
}

pub fn register_upper_path<CS>(_: CS)
where
    CS: CustomScan<Args = CreateUpperPathsHookArgs> + 'static,
{
    unsafe {
        static mut PREV_HOOKS: Lazy<
            HashMap<std::any::TypeId, pg_sys::create_upper_paths_hook_type>,
        > = Lazy::new(Default::default);

        #[pg_guard]
        extern "C-unwind" fn __priv_callback<CS>(
            root: *mut pg_sys::PlannerInfo,
            stage: pg_sys::UpperRelationKind::Type,
            input_rel: *mut pg_sys::RelOptInfo,
            output_rel: *mut pg_sys::RelOptInfo,
            extra: *mut ::std::os::raw::c_void,
        ) where
            CS: CustomScan<Args = CreateUpperPathsHookArgs> + 'static,
        {
            unsafe {
                #[allow(static_mut_refs)]
                if let Some(Some(prev_hook)) = PREV_HOOKS.get(&std::any::TypeId::of::<CS>()) {
                    (*prev_hook)(root, stage, input_rel, output_rel, extra);
                }

                paradedb_upper_paths_callback::<CS>(root, stage, input_rel, output_rel, extra);
            }
        }

        #[allow(static_mut_refs)]
        match PREV_HOOKS.entry(std::any::TypeId::of::<CS>()) {
            Entry::Occupied(_) => panic!("{} is already registered", std::any::type_name::<CS>()),
            Entry::Vacant(entry) => entry.insert(pg_sys::create_upper_paths_hook),
        };

        pg_sys::create_upper_paths_hook = Some(__priv_callback::<CS>);

        pg_sys::RegisterCustomScanMethods(CS::custom_scan_methods())
    }
}

#[pg_guard]
pub extern "C-unwind" fn paradedb_upper_paths_callback<CS>(
    root: *mut pg_sys::PlannerInfo,
    stage: pg_sys::UpperRelationKind::Type,
    input_rel: *mut pg_sys::RelOptInfo,
    output_rel: *mut pg_sys::RelOptInfo,
    extra: *mut ::std::os::raw::c_void,
) where
    CS: CustomScan<Args = CreateUpperPathsHookArgs> + 'static,
{
    if stage != pg_sys::UpperRelationKind::UPPERREL_GROUP_AGG {
        return;
    }

    // Check if pdb.agg() is used - if so, enable aggregate custom scan regardless of GUC
    // Otherwise, respect the enable_aggregate_custom_scan GUC setting
    let has_paradedb_agg = unsafe {
        let parse = (*root).parse;
        !parse.is_null() && query_has_paradedb_agg(parse)
    };

    if !has_paradedb_agg && !gucs::enable_aggregate_custom_scan() {
        return;
    }

    unsafe {
        let Some(path) = CS::create_custom_path(CustomPathBuilder::new(
            root,
            output_rel,
            CreateUpperPathsHookArgs {
                root,
                stage,
                input_rel,
                output_rel,
                extra,
            },
        )) else {
            return;
        };

        add_path(output_rel, path)
    }
}

/// Register a global planner hook to intercept and modify queries before planning.
/// This is called once during extension initialization and affects all queries.
///
/// # Window Function Replacement Strategy
///
/// ## Current Approach: Early Replacement via `planner_hook`
///
/// We replace `WindowFunc` nodes with `paradedb.window_agg(json)` placeholder calls
/// **before** PostgreSQL's standard planning begins. This happens at the very start
/// of the planning process, before any path generation or optimization.
///
/// ### Why Replace Early?
///
/// 1. **Prevents WindowAgg Node Creation**: By replacing window functions before
///    `grouping_planner()` runs, we prevent PostgreSQL from creating `WindowAgg`
///    plan nodes that would try to execute our placeholder functions.
///
/// 2. **Enables TopN Integration**: Our custom scan can detect the placeholder
///    functions in the target list and handle them during `TopN` execution,
///    combining window aggregates with top-N result collection in a single pass.
///
/// 3. **Simpler Integration**: The replacement happens once, early, and the rest
///    of the planning process sees our placeholder functions as regular function
///    calls that get projected through the plan tree.
pub unsafe fn register_window_aggregate_hook() {
    static mut PREV_PLANNER_HOOK: pg_sys::planner_hook_type = None;

    PREV_PLANNER_HOOK = pg_sys::planner_hook;
    pg_sys::planner_hook = Some(paradedb_planner_hook);
}

/// Check if the WHERE clause can be handled by our custom scan.
///
/// This function attempts to extract quals the same way `extract_quals` does to determine
/// if WHERE clause predicates can be pushed down to the index or handled via heap filtering.
///
/// Returns:
/// - `true` if the WHERE clause can be handled
/// - `false` if the WHERE clause has predicates that cannot be handled
unsafe fn can_handle_where_clause(parse: *mut pg_sys::Query) -> bool {
    // If filter_pushdown is enabled, we can handle any WHERE clause via HeapExpr or Qual::All
    if crate::gucs::enable_filter_pushdown() {
        return true;
    }

    // Check if there's a WHERE clause at all
    if (*parse).jointree.is_null() {
        return true; // No WHERE clause
    }

    let jointree = (*parse).jointree;
    if (*jointree).quals.is_null() {
        return true; // No WHERE clause predicates
    }

    // Use the shared helper to check if we can extract quals from the WHERE clause
    // If we can't extract quals (either no index found or extraction failed), we can't handle it
    try_extract_quals_from_query(parse, (*jointree).quals)
}

/// Shared helper function to check if quals can be extracted from a Query's WHERE clause.
///
/// This function encapsulates the common logic of:
/// 1. Finding a relation with a BM25 index in the query's range table
/// 2. Attempting to extract quals using Query context
///
/// Used by `can_handle_where_clause` in the planner hook (to decide if we should replace window functions)
///
/// Returns:
/// - `true` if a BM25 index was found AND quals were successfully extracted
/// - `false` if no BM25 index was found OR quals couldn't be extracted
pub unsafe fn try_extract_quals_from_query(
    parse: *mut pg_sys::Query,
    quals_node: *mut pg_sys::Node,
) -> bool {
    // Get the range table
    let rtable = (*parse).rtable;
    if rtable.is_null() {
        return false;
    }

    let rtable_list = PgList::<pg_sys::RangeTblEntry>::from_pg(rtable);
    if rtable_list.is_empty() {
        return false;
    }

    // Find the first relation RTE with a BM25 index
    for (idx, rte_ptr) in rtable_list.iter_ptr().enumerate() {
        let rte = rte_ptr;
        if (*rte).rtekind != pg_sys::RTEKind::RTE_RELATION {
            continue;
        }

        let relid = (*rte).relid;
        let relkind = pg_sys::get_rel_relkind(relid) as u8;
        if relkind != pg_sys::RELKIND_RELATION && relkind != pg_sys::RELKIND_MATVIEW {
            continue;
        }

        // Check if this relation has a BM25 index
        let Some((_, bm25_index)) = rel_get_bm25_index(relid) else {
            continue;
        };

        // We found a relation with a BM25 index - try to extract quals
        // Use Query context since we don't have PlannerInfo yet
        let rti = (idx + 1) as pg_sys::Index; // RTI is 1-indexed
        let mut state = QualExtractState::default();
        let context = PlannerContext::from_query(parse);

        let quals = extract_quals(
            &context,
            rti,
            quals_node,
            anyelement_query_input_opoid(),
            RestrictInfoType::BaseRelation,
            &bm25_index,
            false, // Don't convert external to special qual
            &mut state,
            true, // Attempt pushdown
        );

        // CRITICAL: In Query context, if we created HeapExpr but filter_pushdown is disabled,
        // we must return false to prevent accepting queries we can't handle correctly.
        // This prevents silent data loss when WHERE clauses can't be fully pushed down.
        if state.uses_heap_expr && !crate::gucs::enable_filter_pushdown() {
            return false;
        }

        return quals.is_some();
    }

    // No BM25 index found
    false
}

/// Check if we should replace window functions in this query.
///
/// Returns `true` if:
/// - Query has window functions AND is a TopN query (ORDER BY + LIMIT)
/// - Query uses `pdb.agg()` OR the `@@@` search operator
/// - WHERE clause can be handled (or no WHERE clause)
///
/// Errors if `pdb.agg()` is used but requirements aren't met.
unsafe fn should_replace_window_functions(parse: *mut pg_sys::Query) -> bool {
    // Early return: not a SELECT query
    if parse.is_null() || (*parse).commandType != pg_sys::CmdType::CMD_SELECT {
        return false;
    }

    // Early return: no window functions
    if !query_has_window_func_nodes(parse) {
        return false;
    }

    let has_paradedb_agg = query_has_paradedb_agg(parse);

    // Check if this is a TopN query
    if !query_is_topn(parse) {
        // pdb.agg() requires TopN
        if has_paradedb_agg {
            pgrx::error!(
                "pdb.agg() window functions require ORDER BY and LIMIT clauses (TopN query)"
            );
        }
        return false;
    }

    // Check if we should handle this query (has pdb.agg or search operator)
    let has_search_operator = query_has_search_operator(parse);
    if !has_paradedb_agg && !has_search_operator {
        return false;
    }

    // Check if WHERE clause can be handled
    if !can_handle_where_clause(parse) {
        if has_paradedb_agg {
            // pdb.agg() requires that we handle the query, but we can't handle the WHERE clause
            pgrx::error!(
                "pdb.agg() window functions cannot be used with this WHERE clause because some predicates may not be pushable to the index. \
                 To fix this, enable 'SET paradedb.enable_filter_pushdown = on' to allow filtering on all fields. \
                 Alternatively, ensure all WHERE predicates use indexed fields or remove non-indexed predicates."
            );
        }
        // For non-pdb.agg queries, just don't replace - let PostgreSQL handle them
        return false;
    }

    true
}

/// Planner hook that replaces WindowFunc nodes before PostgreSQL processes them.
///
/// This hook runs at the very start of query planning, before `grouping_planner()`
/// and before any path generation. It performs a one-time replacement of window
/// functions with placeholder function calls that our custom scans can detect
/// and handle during execution.
#[pg_guard]
unsafe extern "C-unwind" fn paradedb_planner_hook(
    parse: *mut pg_sys::Query,
    query_string: *const ::core::ffi::c_char,
    cursor_options: ::core::ffi::c_int,
    bound_params: pg_sys::ParamListInfo,
) -> *mut pg_sys::PlannedStmt {
    // Check if we should replace window functions and do so if needed
    if should_replace_window_functions(parse) {
        replace_windowfuncs_recursively(parse);
    }

    // Call the previous planner hook or standard planner
    static mut PREV_PLANNER_HOOK: pg_sys::planner_hook_type = None;
    if let Some(prev_hook) = PREV_PLANNER_HOOK {
        prev_hook(parse, query_string, cursor_options, bound_params)
    } else {
        pg_sys::standard_planner(parse, query_string, cursor_options, bound_params)
    }
}

/// Check if the target list contains any window functions (WindowFunc nodes)
/// This is called BEFORE window function replacement in the planner hook
unsafe fn targetlist_has_window_func_nodes(target_list: *mut pg_sys::List) -> bool {
    struct WalkerContext {
        found: bool,
    }

    #[pg_guard]
    unsafe extern "C-unwind" fn walker(
        node: *mut pg_sys::Node,
        context: *mut core::ffi::c_void,
    ) -> bool {
        if node.is_null() {
            return false;
        }

        let ctx = context.cast::<WalkerContext>();

        // Check if this node is a WindowFunc
        if nodecast!(WindowFunc, T_WindowFunc, node).is_some() {
            (*ctx).found = true;
            return true; // Stop walking
        }

        // Continue walking the tree
        pg_sys::expression_tree_walker(node, Some(walker), context)
    }

    let mut context = WalkerContext { found: false };

    let tlist = PgList::<pg_sys::TargetEntry>::from_pg(target_list);
    for te in tlist.iter_ptr() {
        if !(*te).expr.is_null() {
            walker(
                (*te).expr as *mut pg_sys::Node,
                &mut context as *mut _ as *mut core::ffi::c_void,
            );
            if context.found {
                return true;
            }
        }
    }

    false
}

/// Check if the query (or any subquery) contains window functions (WindowFunc nodes)
/// This is called BEFORE window function replacement in the planner hook
/// Recursively checks subqueries if SUBQUERY_SUPPORT is enabled
unsafe fn query_has_window_func_nodes(parse: *mut pg_sys::Query) -> bool {
    if parse.is_null() {
        return false;
    }

    // Check the current query's target list
    if !(*parse).targetList.is_null() && targetlist_has_window_func_nodes((*parse).targetList) {
        return true;
    }

    // Check subqueries in RTEs (only if SUBQUERY_SUPPORT is enabled)
    if window_agg::window_aggregates::SUBQUERY_SUPPORT && !(*parse).rtable.is_null() {
        let rtable = PgList::<pg_sys::RangeTblEntry>::from_pg((*parse).rtable);
        for (idx, rte) in rtable.iter_ptr().enumerate() {
            if (*rte).rtekind == pg_sys::RTEKind::RTE_SUBQUERY
                && !(*rte).subquery.is_null()
                && query_has_window_func_nodes((*rte).subquery)
            {
                return true;
            }
        }
    }

    false
}

/// Check if the query contains the @@@ search operator.
///
/// This indicates that our custom scans will likely handle this query.
/// Uses expression_tree_walker via expr_contains_any_operator for complete traversal.
///
/// Note: This function checks for the *presence* of search operators anywhere in the query,
/// while `extract_quals` in qual_inspect.rs attempts to *extract and convert* those operators
/// into executable quals. They serve different purposes:
/// - This function: Quick boolean check for planner decisions
/// - extract_quals: Detailed extraction and validation for execution
unsafe fn query_has_search_operator(parse: *mut pg_sys::Query) -> bool {
    let searchqueryinput_opno = anyelement_query_input_opoid();
    let text_opno = anyelement_text_opoid();
    let target_ops = [searchqueryinput_opno, text_opno];

    // Check WHERE clause (jointree->quals)
    if !(*parse).jointree.is_null() {
        let jointree = (*parse).jointree;
        if !(*jointree).quals.is_null()
            && expr_contains_any_operator((*jointree).quals, &target_ops)
        {
            return true;
        }
    }

    // Check HAVING clause
    if !(*parse).havingQual.is_null()
        && expr_contains_any_operator((*parse).havingQual, &target_ops)
    {
        return true;
    }

    // Check target list (for search operators in SELECT expressions, aggregates, window functions)
    if !(*parse).targetList.is_null() {
        let target_list = PgList::<pg_sys::TargetEntry>::from_pg((*parse).targetList);
        for te in target_list.iter_ptr() {
            if !(*te).expr.is_null()
                && expr_contains_any_operator((*te).expr as *mut pg_sys::Node, &target_ops)
            {
                return true;
            }
        }
    }

    // Check subqueries recursively
    if !(*parse).rtable.is_null() {
        let rtable = PgList::<pg_sys::RangeTblEntry>::from_pg((*parse).rtable);
        for rte in rtable.iter_ptr() {
            if (*rte).rtekind == pg_sys::RTEKind::RTE_SUBQUERY
                && !(*rte).subquery.is_null()
                && query_has_search_operator((*rte).subquery)
            {
                return true;
            }
        }
    }

    false
}

/// Check if the query is a TopN query (has ORDER BY and LIMIT)
unsafe fn query_is_topn(parse: *mut pg_sys::Query) -> bool {
    if parse.is_null() {
        return false;
    }

    // Must have both ORDER BY (sortClause) and LIMIT (limitCount)
    let has_order_by = !(*parse).sortClause.is_null();
    let has_limit = !(*parse).limitCount.is_null();

    has_order_by && has_limit
}

/// Check if the query contains pdb.agg() in any context (window function or aggregate)
/// If it does, we MUST handle it (even without @@@ operator)
/// Uses expression_tree_walker for complete traversal
///
/// NOTE: This logic is duplicated with similar checks in other parts of the codebase.
/// Both need to identify pdb.agg() calls, so changes to one should be reflected in the other.
/// TODO: Consider unifying this logic to avoid duplication (see GitHub issue #3455)
unsafe fn query_has_paradedb_agg(parse: *mut pg_sys::Query) -> bool {
    use crate::api::agg_funcoid;

    let paradedb_agg_oid = agg_funcoid().to_u32();

    struct WalkerContext {
        paradedb_agg_oid: u32,
        found: bool,
    }

    #[pg_guard]
    unsafe extern "C-unwind" fn walker(
        node: *mut pg_sys::Node,
        context: *mut core::ffi::c_void,
    ) -> bool {
        if node.is_null() {
            return false;
        }

        let ctx = context.cast::<WalkerContext>();

        // Check for window function usage
        if let Some(window_func) = nodecast!(WindowFunc, T_WindowFunc, node) {
            if (*window_func).winfnoid.to_u32() == (*ctx).paradedb_agg_oid {
                (*ctx).found = true;
                return true; // Stop walking
            }
        }

        // Check for aggregate function usage (GROUP BY context)
        if let Some(aggref) = nodecast!(Aggref, T_Aggref, node) {
            if (*aggref).aggfnoid.to_u32() == (*ctx).paradedb_agg_oid {
                (*ctx).found = true;
                return true; // Stop walking
            }
        }

        // Continue walking the tree
        pg_sys::expression_tree_walker(node, Some(walker), context)
    }

    let mut context = WalkerContext {
        paradedb_agg_oid,
        found: false,
    };

    // Check target list
    if !(*parse).targetList.is_null() {
        let target_list = PgList::<pg_sys::TargetEntry>::from_pg((*parse).targetList);
        for te in target_list.iter_ptr() {
            if !(*te).expr.is_null() {
                walker(
                    (*te).expr as *mut pg_sys::Node,
                    &mut context as *mut _ as *mut core::ffi::c_void,
                );
                if context.found {
                    return true;
                }
            }
        }
    }

    // Check HAVING clause
    if !(*parse).havingQual.is_null() {
        walker(
            (*parse).havingQual,
            &mut context as *mut _ as *mut core::ffi::c_void,
        );
        if context.found {
            return true;
        }
    }

    // Check subqueries
    if !(*parse).rtable.is_null() {
        let rtable = PgList::<pg_sys::RangeTblEntry>::from_pg((*parse).rtable);
        for rte in rtable.iter_ptr() {
            if (*rte).rtekind == pg_sys::RTEKind::RTE_SUBQUERY
                && !(*rte).subquery.is_null()
                && query_has_paradedb_agg((*rte).subquery)
            {
                return true;
            }
        }
    }

    false
}

/// Recursively replace window functions in the query and all subqueries
unsafe fn replace_windowfuncs_recursively(parse: *mut pg_sys::Query) {
    if parse.is_null() {
        return;
    }

    // Extract window functions from current query
    let window_tls = window_agg::extract_and_convert_window_functions(parse);
    if !window_tls.is_empty() {
        // Replace window functions in current query
        replace_windowfuncs_in_query(parse, &window_tls);
    }

    // Recursively process subqueries in RTEs
    if !(*parse).rtable.is_null() {
        let rtable = PgList::<pg_sys::RangeTblEntry>::from_pg((*parse).rtable);
        for (idx, rte) in rtable.iter_ptr().enumerate() {
            if (*rte).rtekind == pg_sys::RTEKind::RTE_SUBQUERY && !(*rte).subquery.is_null() {
                // Check if subquery support is enabled
                if window_agg::window_aggregates::SUBQUERY_SUPPORT {
                    replace_windowfuncs_recursively((*rte).subquery);
                }
                // If SUBQUERY_SUPPORT is false, we skip processing subqueries,
                // leaving their window functions for PostgreSQL to handle
            }
        }
    }
}

/// Replace WindowFunc nodes in the Query's target list with placeholder functions
///
/// Takes a map of target_entry_index -> TargetList and replaces each WindowFunc
/// with a paradedb.window_agg(json) call containing the serialized TargetList.
unsafe fn replace_windowfuncs_in_query(
    parse: *mut pg_sys::Query,
    window_tls: &HashMap<usize, TargetList>,
) {
    if (*parse).targetList.is_null() {
        return;
    }

    let original_tlist = PgList::<pg_sys::TargetEntry>::from_pg((*parse).targetList);
    let mut new_targetlist = PgList::<pg_sys::TargetEntry>::new();
    let window_agg_procid = window_agg_oid();

    // If window_agg function doesn't exist yet (e.g., during extension creation), skip replacement
    if window_agg_procid == pg_sys::InvalidOid {
        return;
    }

    let mut replaced_count = 0;

    for (idx, te) in original_tlist.iter_ptr().enumerate() {
        if let Some(_window_agg) = nodecast!(WindowFunc, T_WindowFunc, (*te).expr) {
            // Create a flat copy of the target entry
            let new_te = pg_sys::flatCopyTargetEntry(te);

            // Get the window target list for this target entry
            if let Some(window_tl) = window_tls.get(&idx) {
                let json = serde_json::to_string(window_tl)
                    .expect("Failed to serialize WindowSpecification");

                // Create a Const node for the JSON string
                let json_cstring = std::ffi::CString::new(json).expect("Invalid JSON string");
                let json_text = pg_sys::cstring_to_text(json_cstring.as_ptr());
                let json_datum = pg_sys::Datum::from(json_text as usize);

                // Create an argument list with the JSON string
                let mut args = PgList::<pg_sys::Node>::new();
                let json_const = pg_sys::makeConst(
                    pg_sys::TEXTOID,
                    -1,
                    pg_sys::DEFAULT_COLLATION_OID,
                    -1,
                    json_datum,
                    false, // not null
                    false, // not passed by value (text is varlena)
                );
                args.push(json_const.cast());

                // Create a FuncExpr that calls paradedb.window_agg(json)
                let funcexpr = pg_sys::makeFuncExpr(
                    window_agg_procid,
                    window_tl.singleton_result_type_oid(),
                    args.into_pg(),
                    pg_sys::InvalidOid,
                    pg_sys::InvalidOid,
                    pg_sys::CoercionForm::COERCE_EXPLICIT_CALL,
                );

                // Replace the WindowFunc with our placeholder FuncExpr
                (*new_te).expr = funcexpr.cast();
                new_targetlist.push(new_te);
                replaced_count += 1;
            } else {
                // Still copy the entry but don't replace it
                new_targetlist.push(te);
            }
        } else {
            // For non-WindowFunc entries, just make a flat copy
            let copied_te = pg_sys::flatCopyTargetEntry(te);
            new_targetlist.push(copied_te);
        }
    }

    (*parse).targetList = new_targetlist.into_pg();
}
